/**
 * Copyright 2015年4月2日 Wang Zheng
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * @author Wang Zheng ufo5260987423@163.com
 *
 */
package com.ufo5260987423.graphDatabase.core.connector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.ufo5260987423.graphDatabase.core.lifecycle.LifeCycleInf;

/**
 * @ClassName: NetConnector
 * @Description: TODO
 * @author Wang Zheng ufo5260987423@163.com
 * @date 2015年4月2日 下午5:25:35
 * 
 */
public class NetConnector extends Thread implements LifeCycleInf {
    static List<AsynchronousSocketChannel> channelList = new ArrayList<AsynchronousSocketChannel>();
    static NetConnector netConnector;

	private String ip = "127.0.0.1";
	private static String encode = "utf-8";
	private Integer amountOfThread = 5;
	private Integer port = 31413;
	private REPL_Connector repl_Connector;

    protected  AsynchronousServerSocketChannel serverChannel;

	@Override
	public Boolean loadConf() {
		System.err.println("netConnector loading");
        netConnector=this;

        ExecutorService executor= Executors.newFixedThreadPool(this.getAmountOfThread());
        try {
            AsynchronousChannelGroup channelGroup=AsynchronousChannelGroup.withThreadPool(executor);
            serverChannel=AsynchronousServerSocketChannel.open(channelGroup)
                    .bind(new InetSocketAddress(this.getIp(),this.getPort()));
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
	}

	public void run() {
		System.err.println("run netConnector");
        serverChannel.accept(null,new AcceptHandler(serverChannel));
	}

	public String getIp() {
		return ip;
	}

	public void setIp(String ip) {
		this.ip = ip;
	}

	public String getEncode() {
		return encode;
	}

	public static void setEncode(String encode) {
		NetConnector.encode = encode;
	}

	public Integer getAmountOfThread() {
		return amountOfThread;
	}

	public void setAmountOfThread(Integer amountOfThread) {
		this.amountOfThread = amountOfThread;
	}

	public Integer getPort() {
		return port;
	}

	public void setPort(Integer port) {
		this.port = port;
	}

	public REPL_Connector getRepl_Connector() {
		return repl_Connector;
	}

	public void setRepl_Connector(REPL_Connector repl_Connector) {
		this.repl_Connector = repl_Connector;
	}
}
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
    private AsynchronousServerSocketChannel serverChannel;

    ByteBuffer buff=ByteBuffer.allocate(10240);
    public AcceptHandler(AsynchronousServerSocketChannel sc){
       this.setServerChannel(sc);
    }

    @Override
    public void completed(final AsynchronousSocketChannel sc,Object attachment){
        NetConnector.channelList.add(sc);
        serverChannel.accept(null, this);
        sc.read(buff, null,
                new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        buff.flip();
                        String content=StandardCharsets.UTF_8.decode(buff).toString();
                        try {
                            sc.write(ByteBuffer.wrap(NetConnector.netConnector.
                                    getRepl_Connector().eval_string(content).getBytes()));
                            buff.clear();
                            sc.read(buff,null,this);
                        } catch (Exception e) {
                            e.printStackTrace();
                            if(sc.isOpen()&&!content.isEmpty())
                                sc.write(ByteBuffer.wrap((content.split(" ")[0] + " nil} \r").getBytes()));
                            this.failed(e,null);
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.err.println("fail to read:" + exc);
                        NetConnector.channelList.remove(sc);
                    }
                });
    }

    @Override
    public void failed(Throwable ex,Object attachment){
        System.err.println("fail to read:" + ex);
    }

    public AsynchronousServerSocketChannel getServerChannel() {
        return serverChannel;
    }

    public void setServerChannel(AsynchronousServerSocketChannel serverChannel) {
        this.serverChannel = serverChannel;
    }
}